-
Notifications
You must be signed in to change notification settings - Fork 502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest: Make LedgerTransactionReader
seekable and lazy
#5274
Conversation
LedgerTransactionReader
LedgerTransactionReader
@Shaptic I don't think it's necessary to create a lazy variant of the LedgerTransactionReader. Instead, you can refactor the existing LedgerTransactionReader and it should be sufficient for your use case: diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go
index 819930994..9f816f0ed 100644
--- a/ingest/ledger_transaction_reader.go
+++ b/ingest/ledger_transaction_reader.go
@@ -15,7 +15,7 @@ import (
// Use NewTransactionReader to create a new instance.
type LedgerTransactionReader struct {
ledgerCloseMeta xdr.LedgerCloseMeta
- transactions []LedgerTransaction
+ byHash map[xdr.Hash]xdr.TransactionEnvelope
readIdx int
}
@@ -53,56 +53,53 @@ func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry
// Read returns the next transaction in the ledger, ordered by tx number, each time
// it is called. When there are no more transactions to return, an EOF error is returned.
func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) {
- if reader.readIdx < len(reader.transactions) {
+ if reader.readIdx < len(reader.byHash) {
+ i := reader.readIdx
+ hash := reader.ledgerCloseMeta.TransactionHash(i)
+ envelope, ok := reader.byHash[hash]
+ if !ok {
+ hexHash := hex.EncodeToString(hash[:])
+ return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
+ }
+
+ // We check the version only if FeeProcessing are non empty because some backends
+ // (like HistoryArchiveBackend) do not return meta.
+ if reader.ledgerCloseMeta.ProtocolVersion() < 10 && reader.ledgerCloseMeta.TxApplyProcessing(i).V < 2 &&
+ len(reader.ledgerCloseMeta.FeeProcessing(i)) > 0 {
+ return LedgerTransaction{}, errors.New(
+ "TransactionMeta.V=2 is required in protocol version older than version 10. " +
+ "Please process ledgers again using the latest stellar-core version.",
+ )
+ }
+
reader.readIdx++
- return reader.transactions[reader.readIdx-1], nil
+ return LedgerTransaction{
+ Index: uint32(i + 1), // Transactions start at '1'
+ Envelope: envelope,
+ Result: reader.ledgerCloseMeta.TransactionResultPair(i),
+ UnsafeMeta: reader.ledgerCloseMeta.TxApplyProcessing(i),
+ FeeChanges: lcm.FeeProcessing(i),
+ LedgerVersion: uint32(reader.ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.LedgerVersion),
+ }, nil
}
return LedgerTransaction{}, io.EOF
}
-// Rewind resets the reader back to the first transaction in the ledger
-func (reader *LedgerTransactionReader) Rewind() {
- reader.readIdx = 0
+// Seek moves the reader to the given position
+func (reader *LedgerTransactionReader) Seek(index int) {
+ reader.readIdx = index
}
// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide
// a per-transaction view of the data when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error {
- byHash := map[xdr.Hash]xdr.TransactionEnvelope{}
+ reader.byHash = map[xdr.Hash]xdr.TransactionEnvelope{}
for i, tx := range lcm.TransactionEnvelopes() {
hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase)
if err != nil {
return errors.Wrapf(err, "could not hash transaction %d in TxSet", i)
}
- byHash[hash] = tx
- }
-
- for i := 0; i < lcm.CountTransactions(); i++ {
- hash := lcm.TransactionHash(i)
- envelope, ok := byHash[hash]
- if !ok {
- hexHash := hex.EncodeToString(hash[:])
- return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
- }
-
- // We check the version only if FeeProcessing are non empty because some backends
- // (like HistoryArchiveBackend) do not return meta.
- if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 &&
- len(lcm.FeeProcessing(i)) > 0 {
- return errors.New(
- "TransactionMeta.V=2 is required in protocol version older than version 10. " +
- "Please process ledgers again using the latest stellar-core version.",
- )
- }
-
- reader.transactions = append(reader.transactions, LedgerTransaction{
- Index: uint32(i + 1), // Transactions start at '1'
- Envelope: envelope,
- Result: lcm.TransactionResultPair(i),
- UnsafeMeta: lcm.TxApplyProcessing(i),
- FeeChanges: lcm.FeeProcessing(i),
- LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
- })
+ reader.byHash[hash] = tx
}
return nil
}
@@ -111,6 +108,6 @@ func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta
// helpful when there are still some transactions available so reader can stop
// streaming them.
func (reader *LedgerTransactionReader) Close() error {
- reader.transactions = nil
+ reader.byHash = nil
return nil
}
|
@tamirms yeah I considered adapting the existing reader, but I was concerned about possible side-effects to other systems. I figured it would make it easier to iterate and debug, with a possible goal of later merging the two once the indexed design experiment (Approach 3 in the design doc) is sufficiently validated. |
@Shaptic I prefer the approach of having only one implementation of the transaction reader and creating a new variant only if necessary. The transaction reader interface is well defined and the code changes to I think creating another transaction reader implementation can actually introduce more bugs because you can forget to copy over parts from |
07a9d40
to
a619124
Compare
Those are all great points! Like I said, that was going to be my initial approach, but it felt risky to touch at all. I've done as you suggested in 2cd0d68, please have a look. I just want to emphasize one bit again:
As you can see in the diff, that's definitely true, but we're introducing the following set of performance differences for all cases, even when we don't want to be "lazy":
I don't think any of these should be problematic or noticeable, but I did want to call them out for visibility and posterity. |
2e0b571
to
5495ffc
Compare
Halve memory usage by keeping refs to envelopes, instead
5495ffc
to
cb39755
Compare
cb39755
to
2cd0d68
Compare
we could benchmark the operations you've listed to gain some confidence. But my guess is that difference in performance will be on the order of microseconds per operation |
LedgerTransactionReader
LedgerTransactionReader
seekable and lazy
What
This adds a new way to read transactions from
LedgerCloseMeta
, making two key changes to the existingingest.LedgerTransactionReader
:ingest.LedgerTransaction
instances when they are requested rather than all at once as it is today. They are still cached internally, though, so subsequent re-reads will not need to convert.Why
This is to enable on-the-fly ingestion of transaction from specific instances of
LedgerCloseMeta
as part of stellar/stellar-rpc#115.Known limitations
In order to keep the scope of the implementation tight and specific to our current needs, this only works with a
xdr.LedgerCloseMeta
instance (rather than a backend) and only works with Protocol 20+.